package cn.zhangfusheng.elasticsearch.template;

import cn.zhangfusheng.elasticsearch.thread.ThreadLocalDetail;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.bulk.BackoffPolicy;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;

import java.util.function.BiConsumer;

/**
 * @author fusheng.zhang
 * @date 2022-02-24 15:50:33
 */
public abstract class AbstractElasticSearchRestTemplate
        implements Template, TemplageIngestApi, TemplateDocumentApi,
        TemplateSearchApi, TemplateJpaApi, TemplateMybatisApi, TemplateDynamicStrApi, TemplateDynamicSqlApi {

    private final RestHighLevelClient restHighLevelClient;
    private final BulkProcessor bulkProcessor;

    protected AbstractElasticSearchRestTemplate(RestHighLevelClient restHighLevelClient) {
        this.restHighLevelClient = restHighLevelClient;
        this.bulkProcessor = buildBulkProcessor();
    }

    private BulkProcessor buildBulkProcessor() {
        BulkProcessor.Listener listener = new BulkProcessor.Listener() {
            @Override
            public void beforeBulk(long executionId, BulkRequest request) {
                System.out.println("beforeBulk:" + executionId);
            }

            @Override
            public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
                System.out.println("afterBulk:" + executionId);
            }

            @Override
            public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
                System.out.println("afterBulk:" + executionId);
            }
        };
        BiConsumer<BulkRequest, ActionListener<BulkResponse>> bulkRequestActionListenerBiConsumer =
                (bulkRequest, bulkResponseActionListener) ->
                        this.restHighLevelClient.bulkAsync(bulkRequest, ThreadLocalDetail.requestOptions(), bulkResponseActionListener);
        return BulkProcessor.builder(bulkRequestActionListenerBiConsumer, listener)
                .setBulkActions(1000) // 1000 条数据执行一次请求
                .setBulkSize(new ByteSizeValue(5L, ByteSizeUnit.MB)) // 5MB 数据执行一次
                .setConcurrentRequests(0) //
                .setFlushInterval(TimeValue.timeValueSeconds(3L))
                .setBackoffPolicy(BackoffPolicy.constantBackoff(TimeValue.timeValueSeconds(1L), 5))
                .build();
    }


    @Override
    public RestHighLevelClient restHighLevelClient() {
        return restHighLevelClient;
    }

    @Override
    public BulkProcessor getBulkProcessor() {
        return bulkProcessor;
    }
}
